/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq;

import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;

import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * <P>
 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
 * <P>
 * A session serves several purposes:
 * <UL>
 * <LI>It is a factory for its message producers and consumers.
 * <LI>It supplies provider-optimized message factories.
 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
 * dynamically manipulate provider-specific destination names.
 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
 * units.
 * <LI>It defines a serial order for the messages it consumes and the messages it produces.
 * <LI>It retains messages it consumes until they have been acknowledged.
 * <LI>It serializes execution of message listeners registered with its message consumers.
 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
 * </UL>
 * <P>
 * A session can create and service multiple message producers and consumers.
 * <P>
 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives. The
 * thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
 * <P>
 * If a client desires to have one thread produce messages while others consume them, the client should use a separate
 * session for its producing thread.
 * <P>
 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
 * constituent objects from another thread of control. The only exception to this rule is the use of the session or
 * connection <CODE>close</CODE> method.
 * <P>
 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
 * start simply and incrementally add message processing complexity as their need for concurrency grows.
 * <P>
 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is being
 * executed in another thread.
 * <P>
 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
 * transactions organize a session's input message stream and output message stream into series of atomic units. When a
 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
 * recovered.
 * <P>
 * The content of a transaction's input and output units is simply those messages that have been produced and consumed
 * within the session's current transaction.
 * <P>
 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's
 * <CODE>rollback </CODE> method. The completion of a session's current transaction automatically begins the next. The
 * result is that a transacted session always has a current transaction within which its work is done.
 * <P>
 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
 * methods in this context is prohibited.
 * <P>
 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
 * <P>
 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
 * JMS API into their application server products.
 *
 *
 * @see javax.jms.Session
 * @see javax.jms.QueueSession
 * @see javax.jms.TopicSession
 * @see javax.jms.XASession
 */
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher
{

	/**
	 * Only acknowledge an individual message - using message.acknowledge() as opposed to CLIENT_ACKNOWLEDGE which
	 * acknowledges all messages consumed by a session at when acknowledge() is called
	 */
	public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
	public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;

	public static interface DeliveryListener
	{
		void beforeDelivery(ActiveMQSession session, Message msg);

		void afterDelivery(ActiveMQSession session, Message msg);
	}

	private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
	private final ThreadPoolExecutor connectionExecutor;

	protected int acknowledgementMode;
	protected final ActiveMQConnection connection;
	protected final SessionInfo info;
	protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
	protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
	protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
	protected final ActiveMQSessionExecutor executor;
	protected final AtomicBoolean started = new AtomicBoolean(false);

	protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
	protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();

	protected boolean closed;
	private volatile boolean synchronizationRegistered;
	protected boolean asyncDispatch;
	protected boolean sessionAsyncDispatch;
	protected final boolean debug;
	protected final Object sendMutex = new Object();
	protected final Object redeliveryGuard = new Object();

	private final AtomicBoolean clearInProgress = new AtomicBoolean();

	private MessageListener messageListener;
	private final JMSSessionStatsImpl stats;
	private TransactionContext transactionContext;
	private DeliveryListener deliveryListener;
	private MessageTransformer transformer;
	private BlobTransferPolicy blobTransferPolicy;
	private long lastDeliveredSequenceId = -2;

	/**
	 * Construct the Session
	 *
	 * @param connection
	 * @param sessionId
	 * @param acknowledgeMode
	 *           n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
	 * @param asyncDispatch
	 * @param sessionAsyncDispatch
	 * @throws JMSException
	 *            on internal error
	 */
	protected ActiveMQSession(final ActiveMQConnection connection, final SessionId sessionId, final int acknowledgeMode,
			final boolean asyncDispatch, final boolean sessionAsyncDispatch) throws JMSException
	{
		this.debug = LOG.isDebugEnabled();
		this.connection = connection;
		this.acknowledgementMode = acknowledgeMode;
		this.asyncDispatch = asyncDispatch;
		this.sessionAsyncDispatch = sessionAsyncDispatch;
		this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
		setTransactionContext(new TransactionContext(connection));
		stats = new JMSSessionStatsImpl(producers, consumers);
		this.connection.asyncSendPacket(info);
		setTransformer(connection.getTransformer());
		setBlobTransferPolicy(connection.getBlobTransferPolicy());
		this.connectionExecutor = connection.getExecutor();
		this.executor = new ActiveMQSessionExecutor(this);
		connection.addSession(this);
		if (connection.isStarted())
		{
			start();
		}

	}

	protected ActiveMQSession(final ActiveMQConnection connection, final SessionId sessionId, final int acknowledgeMode,
			final boolean asyncDispatch) throws JMSException
	{
		this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
	}

	/**
	 * Sets the transaction context of the session.
	 *
	 * @param transactionContext
	 *           - provides the means to control a JMS transaction.
	 */
	public void setTransactionContext(final TransactionContext transactionContext)
	{
		this.transactionContext = transactionContext;
	}

	/**
	 * Returns the transaction context of the session.
	 *
	 * @return transactionContext - session's transaction context.
	 */
	public TransactionContext getTransactionContext()
	{
		return transactionContext;
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see org.apache.activemq.management.StatsCapable#getStats()
	 */
	@Override
	public StatsImpl getStats()
	{
		return stats;
	}

	/**
	 * Returns the session's statistics.
	 *
	 * @return stats - session's statistics.
	 */
	public JMSSessionStatsImpl getSessionStats()
	{
		return stats;
	}

	/**
	 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
	 * containing a stream of uninterpreted bytes.
	 *
	 * @return the an ActiveMQBytesMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public BytesMessage createBytesMessage() throws JMSException
	{
		final ActiveMQBytesMessage message = new ActiveMQBytesMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining set of
	 * name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the Java
	 * programming language.
	 *
	 * @return an ActiveMQMapMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public MapMessage createMapMessage() throws JMSException
	{
		final ActiveMQMapMessage message = new ActiveMQMapMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
	 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when a
	 * message containing only header information is sufficient.
	 *
	 * @return an ActiveMQMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public Message createMessage() throws JMSException
	{
		final ActiveMQMessage message = new ActiveMQMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message that
	 * contains a serializable Java object.
	 *
	 * @return an ActiveMQObjectMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public ObjectMessage createObjectMessage() throws JMSException
	{
		final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a
	 * message that contains a serializable Java object.
	 *
	 * @param object
	 *           the object to use to initialize this message
	 * @return an ActiveMQObjectMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
	{
		final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
		configureMessage(message);
		message.setObject(object);
		return message;
	}

	/**
	 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a self-defining
	 * stream of primitive values in the Java programming language.
	 *
	 * @return an ActiveMQStreamMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public StreamMessage createStreamMessage() throws JMSException
	{
		final ActiveMQStreamMessage message = new ActiveMQStreamMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message containing
	 * a <CODE>String</CODE> object.
	 *
	 * @return an ActiveMQTextMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public TextMessage createTextMessage() throws JMSException
	{
		final ActiveMQTextMessage message = new ActiveMQTextMessage();
		configureMessage(message);
		return message;
	}

	/**
	 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
	 * message containing a <CODE>String</CODE>.
	 *
	 * @param text
	 *           the string used to initialize this message
	 * @return an ActiveMQTextMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	@Override
	public TextMessage createTextMessage(final String text) throws JMSException
	{
		final ActiveMQTextMessage message = new ActiveMQTextMessage();
		message.setText(text);
		configureMessage(message);
		return message;
	}

	/**
	 * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE> object is used to send a
	 * message containing a <CODE>URL</CODE> which points to some network addressible BLOB.
	 *
	 * @param url
	 *           the network addressable URL used to pass directly to the consumer
	 * @return a BlobMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	public BlobMessage createBlobMessage(final URL url) throws JMSException
	{
		return createBlobMessage(url, false);
	}

	/**
	 * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE> object is used to send a
	 * message containing a <CODE>URL</CODE> which points to some network addressible BLOB.
	 *
	 * @param url
	 *           the network addressable URL used to pass directly to the consumer
	 * @param deletedByBroker
	 *           indicates whether or not the resource is deleted by the broker when the message is acknowledged
	 * @return a BlobMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	public BlobMessage createBlobMessage(final URL url, final boolean deletedByBroker) throws JMSException
	{
		final ActiveMQBlobMessage message = new ActiveMQBlobMessage();
		configureMessage(message);
		message.setURL(url);
		message.setDeletedByBroker(deletedByBroker);
		message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
		return message;
	}

	/**
	 * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE> object is used to send a
	 * message containing the <CODE>File</CODE> content. Before the message is sent the file conent will be uploaded to
	 * the broker or some other remote repository depending on the {@link #getBlobTransferPolicy()}.
	 *
	 * @param file
	 *           the file to be uploaded to some remote repo (or the broker) depending on the strategy
	 * @return a BlobMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	public BlobMessage createBlobMessage(final File file) throws JMSException
	{
		final ActiveMQBlobMessage message = new ActiveMQBlobMessage();
		configureMessage(message);
		message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
		message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
		message.setDeletedByBroker(true);
		message.setName(file.getName());
		return message;
	}

	/**
	 * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE> object is used to send a
	 * message containing the <CODE>File</CODE> content. Before the message is sent the file conent will be uploaded to
	 * the broker or some other remote repository depending on the {@link #getBlobTransferPolicy()}. <br/>
	 * <p>
	 * The caller of this method is responsible for closing the input stream that is used, however the stream can not be
	 * closed until <b>after</b> the message has been sent. To have this class manage the stream and close it
	 * automatically, use the method {@link ActiveMQSession#createBlobMessage(File)}
	 *
	 * @param in
	 *           the stream to be uploaded to some remote repo (or the broker) depending on the strategy
	 * @return a BlobMessage
	 * @throws JMSException
	 *            if the JMS provider fails to create this message due to some internal error.
	 */
	public BlobMessage createBlobMessage(final InputStream in) throws JMSException
	{
		final ActiveMQBlobMessage message = new ActiveMQBlobMessage();
		configureMessage(message);
		message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
		message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
		message.setDeletedByBroker(true);
		return message;
	}

	/**
	 * Indicates whether the session is in transacted mode.
	 *
	 * @return true if the session is in transacted mode
	 * @throws JMSException
	 *            if there is some internal error.
	 */
	@Override
	public boolean getTransacted() throws JMSException
	{
		checkClosed();
		return isTransacted();
	}

	/**
	 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
	 * created. If the session is transacted, the acknowledgement mode is ignored.
	 *
	 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the session
	 *         is transacted, returns SESSION_TRANSACTED.
	 * @throws JMSException
	 * @see javax.jms.Connection#createSession(boolean,int)
	 * @since 1.1 exception JMSException if there is some internal error.
	 */
	@Override
	public int getAcknowledgeMode() throws JMSException
	{
		checkClosed();
		return this.acknowledgementMode;
	}

	/**
	 * Commits all messages done in this transaction and releases any locks currently held.
	 *
	 * @throws JMSException
	 *            if the JMS provider fails to commit the transaction due to some internal error.
	 * @throws TransactionRolledBackException
	 *            if the transaction is rolled back due to some internal error during commit.
	 * @throws javax.jms.IllegalStateException
	 *            if the method is not called by a transacted session.
	 */
	@Override
	public void commit() throws JMSException
	{
		checkClosed();
		if (!getTransacted())
		{
			throw new javax.jms.IllegalStateException("Not a transacted session");
		}
		if (LOG.isDebugEnabled())
		{
			LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
		}
		transactionContext.commit();
	}

	/**
	 * Rolls back any messages done in this transaction and releases any locks currently held.
	 *
	 * @throws JMSException
	 *            if the JMS provider fails to roll back the transaction due to some internal error.
	 * @throws javax.jms.IllegalStateException
	 *            if the method is not called by a transacted session.
	 */
	@Override
	public void rollback() throws JMSException
	{
		checkClosed();
		if (!getTransacted())
		{
			throw new javax.jms.IllegalStateException("Not a transacted session");
		}
		if (LOG.isDebugEnabled())
		{
			LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId());
		}
		transactionContext.rollback();
	}

	/**
	 * Closes the session.
	 * <P>
	 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
	 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not be
	 * timely enough.
	 * <P>
	 * There is no need to close the producers and consumers of a closed session.
	 * <P>
	 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
	 * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
	 * <P>
	 * Closing a transacted session must roll back the transaction in progress.
	 * <P>
	 * This method is the only <CODE>Session</CODE> method that can be called concurrently.
	 * <P>
	 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a
	 * <CODE> JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
	 *
	 * @throws JMSException
	 *            if the JMS provider fails to close the session due to some internal error.
	 */
	@Override
	public void close() throws JMSException
	{
		if (!closed)
		{
			if (getTransactionContext().isInXATransaction())
			{
				if (!synchronizationRegistered)
				{
					synchronizationRegistered = true;
					getTransactionContext().addSynchronization(new Synchronization()
					{

						@Override
						public void afterCommit() throws Exception
						{
							doClose();
							synchronizationRegistered = false;
						}

						@Override
						public void afterRollback() throws Exception
						{
							doClose();
							synchronizationRegistered = false;
						}
					});
				}

			}
			else
			{
				doClose();
			}
		}
	}

	private void doClose() throws JMSException
	{
		dispose();
		final RemoveInfo removeCommand = info.createRemoveCommand();
		removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
		connection.asyncSendPacket(removeCommand);
	}

	final AtomicInteger clearRequestsCounter = new AtomicInteger(0);

	void clearMessagesInProgress(final AtomicInteger transportInterruptionProcessingComplete)
	{
		clearRequestsCounter.incrementAndGet();
		executor.clearMessagesInProgress();
		// we are called from inside the transport reconnection logic which involves us
		// clearing all the connections' consumers dispatch and delivered lists. So rather
		// than trying to grab a mutex (which could be already owned by the message listener
		// calling the send or an ack) we allow it to complete in a separate thread via the
		// scheduler and notify us via connection.transportInterruptionProcessingComplete()
		//
		// We must be careful though not to allow multiple calls to this method from a
		// connection that is having issue becoming fully established from causing a large
		// build up of scheduled tasks to clear the same consumers over and over.
		if (consumers.isEmpty())
		{
			return;
		}

		if (clearInProgress.compareAndSet(false, true))
		{
			for (final ActiveMQMessageConsumer consumer : consumers)
			{
				consumer.inProgressClearRequired();
				transportInterruptionProcessingComplete.incrementAndGet();
				try
				{
					connection.getScheduler().executeAfterDelay(new Runnable()
					{
						@Override
						public void run()
						{
							consumer.clearMessagesInProgress();
						}
					}, 0l);
				}
				catch (final JMSException e)
				{
					connection.onClientInternalException(e);
				}
			}

			try
			{
				connection.getScheduler().executeAfterDelay(new Runnable()
				{
					@Override
					public void run()
					{
						clearInProgress.set(false);
					}
				}, 0l);
			}
			catch (final JMSException e)
			{
				connection.onClientInternalException(e);
			}
		}
	}

	void deliverAcks()
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer consumer = iter.next();
			consumer.deliverAcks();
		}
	}

	public synchronized void dispose() throws JMSException
	{
		if (!closed)
		{

			try
			{
				executor.close();

				for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
				{
					final ActiveMQMessageConsumer consumer = iter.next();
					consumer.setFailureError(connection.getFirstFailureError());
					consumer.dispose();
					lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
				}
				consumers.clear();

				for (final Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();)
				{
					final ActiveMQMessageProducer producer = iter.next();
					producer.dispose();
				}
				producers.clear();

				try
				{
					if (getTransactionContext().isInLocalTransaction())
					{
						rollback();
					}
				}
				catch (final JMSException e)
				{
				}

			}
			finally
			{
				connection.removeSession(this);
				this.transactionContext = null;
				closed = true;
			}
		}
	}

	/**
	 * Checks that the session is not closed then configures the message
	 */
	protected void configureMessage(final ActiveMQMessage message) throws IllegalStateException
	{
		checkClosed();
		message.setConnection(connection);
	}

	/**
	 * Check if the session is closed. It is used for ensuring that the session is open before performing various
	 * operations.
	 *
	 * @throws IllegalStateException
	 *            if the Session is closed
	 */
	protected void checkClosed() throws IllegalStateException
	{
		if (closed)
		{
			throw new IllegalStateException("The Session is closed");
		}
	}

	/**
	 * Checks if the session is closed.
	 *
	 * @return true if the session is closed, false otherwise.
	 */
	public boolean isClosed()
	{
		return closed;
	}

	/**
	 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
	 * <P>
	 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
	 * messages that have been delivered to the client.
	 * <P>
	 * Restarting a session causes it to take the following actions:
	 * <UL>
	 * <LI>Stop message delivery
	 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
	 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
	 * Redelivered messages do not have to be delivered in exactly their original delivery order.
	 * </UL>
	 *
	 * @throws JMSException
	 *            if the JMS provider fails to stop and restart message delivery due to some internal error.
	 * @throws IllegalStateException
	 *            if the method is called by a transacted session.
	 */
	@Override
	public void recover() throws JMSException
	{

		checkClosed();
		if (getTransacted())
		{
			throw new IllegalStateException("This session is transacted");
		}

		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			c.rollback();
		}

	}

	/**
	 * Returns the session's distinguished message listener (optional).
	 *
	 * @return the message listener associated with this session
	 * @throws JMSException
	 *            if the JMS provider fails to get the message listener due to an internal error.
	 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
	 * @see javax.jms.ServerSessionPool
	 * @see javax.jms.ServerSession
	 */
	@Override
	public MessageListener getMessageListener() throws JMSException
	{
		checkClosed();
		return this.messageListener;
	}

	/**
	 * Sets the session's distinguished message listener (optional).
	 * <P>
	 * When the distinguished message listener is set, no other form of message receipt in the session can be used;
	 * however, all forms of sending messages are still supported.
	 * <P>
	 * If this session has been closed, then an {@link IllegalStateException} is thrown, if trying to set a new listener.
	 * However setting the listener to <tt>null</tt> is allowed, to clear the listener, even if this session has been
	 * closed prior.
	 * <P>
	 * This is an expert facility not used by regular JMS clients.
	 *
	 * @param listener
	 *           the message listener to associate with this session
	 * @throws JMSException
	 *            if the JMS provider fails to set the message listener due to an internal error.
	 * @see javax.jms.Session#getMessageListener()
	 * @see javax.jms.ServerSessionPool
	 * @see javax.jms.ServerSession
	 */
	@Override
	public void setMessageListener(final MessageListener listener) throws JMSException
	{
		// only check for closed if we set a new listener, as we allow to clear
		// the listener, such as when an application is shutting down, and is
		// no longer using a message listener on this session
		if (listener != null)
		{
			checkClosed();
		}
		this.messageListener = listener;

		if (listener != null)
		{
			executor.setDispatchedBySessionPool(true);
		}
	}

	/**
	 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
	 *
	 * @see javax.jms.ServerSession
	 */
	@Override
	public void run()
	{
		MessageDispatch messageDispatch;
		while ((messageDispatch = executor.dequeueNoWait()) != null)
		{
			final MessageDispatch md = messageDispatch;
			final ActiveMQMessage message = (ActiveMQMessage) md.getMessage();

			MessageAck earlyAck = null;
			if (message.isExpired())
			{
				earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
				earlyAck.setFirstMessageId(message.getMessageId());
			}
			else if (connection.isDuplicate(ActiveMQSession.this, message))
			{
				LOG.debug("{} got duplicate: {}", this, message.getMessageId());
				earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
				earlyAck.setFirstMessageId(md.getMessage().getMessageId());
				earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
			}
			if (earlyAck != null)
			{
				try
				{
					asyncSendPacket(earlyAck);
				}
				catch (final Throwable t)
				{
					LOG.error("error dispatching ack: {} ", earlyAck, t);
					connection.onClientInternalException(t);
				}
				finally
				{
					continue;
				}
			}

			if (isClientAcknowledge() || isIndividualAcknowledge())
			{
				message.setAcknowledgeCallback(new Callback()
				{
					@Override
					public void execute() throws Exception
					{
					}
				});
			}

			if (deliveryListener != null)
			{
				deliveryListener.beforeDelivery(this, message);
			}

			md.setDeliverySequenceId(getNextDeliveryId());
			lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();

			final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);

			final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
			/*
			 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. We
			 * dont want the after deliver being called after the redeliver as it may cause some weird stuff.
			 */
			synchronized (redeliveryGuard)
			{
				try
				{
					ack.setFirstMessageId(md.getMessage().getMessageId());
					doStartTransaction();
					ack.setTransactionId(getTransactionContext().getTransactionId());
					if (ack.getTransactionId() != null)
					{
						getTransactionContext().addSynchronization(new Synchronization()
						{

							final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE
									? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());

							@Override
							public void beforeEnd() throws Exception
							{
								// validate our consumer so we don't push stale acks that get ignored
								if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId()))
								{
									LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
									throw new TransactionRolledBackException(
											"consumer " + ack.getConsumerId() + " no longer active on " + connection);
								}
								LOG.trace("beforeEnd ack {}", ack);
								sendAck(ack);
							}

							@Override
							public void afterRollback() throws Exception
							{
								LOG.trace("rollback {}", ack, new Throwable("here"));
								// ensure we don't filter this as a duplicate
								connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());

								// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
								if (clearRequestsCounter.get() > clearRequestCount)
								{
									LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(),
											connection.getTransport());
									return;
								}

								// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
								if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId()))
								{
									LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md,
											ack.getTransactionId(), connection.getTransport());
									return;
								}

								final RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
								final int redeliveryCounter = md.getMessage().getRedeliveryCounter();
								if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
										&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries())
								{
									// We need to NACK the messages so that they get
									// sent to the
									// DLQ.
									// Acknowledge the last message.
									final MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
									ack.setFirstMessageId(md.getMessage().getMessageId());
									ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
									asyncSendPacket(ack);

								}
								else
								{

									final MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
									ack.setFirstMessageId(md.getMessage().getMessageId());
									asyncSendPacket(ack);

									// Figure out how long we should wait to resend
									// this message.
									long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
									for (int i = 0; i < redeliveryCounter; i++)
									{
										redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
									}

									/*
									 * If we are a non blocking delivery then we need to stop the executor to avoid more messages
									 * being delivered, once the message is redelivered we can restart it.
									 */
									if (!connection.isNonBlockingRedelivery())
									{
										LOG.debug("Blocking session until re-delivery...");
										executor.stop();
									}

									connection.getScheduler().executeAfterDelay(new Runnable()
									{

										@Override
										public void run()
										{
											/*
											 * wait for the first delivery to be complete, i.e. after delivery has been called.
											 */
											synchronized (redeliveryGuard)
											{
												/*
												 * If its non blocking then we can just dispatch in a new session.
												 */
												if (connection.isNonBlockingRedelivery())
												{
													((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
												}
												else
												{
													/*
													 * If there has been an error thrown during afterDelivery then the endpoint will
													 * be marked as dead so redelivery will fail (and eventually the session marked
													 * as stale), in this case we can only call dispatch which will create a new
													 * session with a new endpoint.
													 */
													if (afterDeliveryError.get())
													{
														((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
													}
													else
													{
														executor.executeFirst(md);
														executor.start();
													}
												}
											}
										}
									}, redeliveryDelay);
								}
								md.getMessage().onMessageRolledBack();
							}
						});
					}

					LOG.trace("{} onMessage({})", this, message.getMessageId());
					messageListener.onMessage(message);

				}
				catch (final Throwable e)
				{
					LOG.error("error dispatching message: ", e);

					// A problem while invoking the MessageListener does not
					// in general indicate a problem with the connection to the broker, i.e.
					// it will usually be sufficient to let the afterDelivery() method either
					// commit or roll back in order to deal with the exception.
					// However, we notify any registered client internal exception listener
					// of the problem.
					connection.onClientInternalException(e);
				}
				finally
				{
					if (ack.getTransactionId() == null)
					{
						try
						{
							asyncSendPacket(ack);
						}
						catch (final Throwable e)
						{
							connection.onClientInternalException(e);
						}
					}
				}

				if (deliveryListener != null)
				{
					try
					{
						deliveryListener.afterDelivery(this, message);
					}
					catch (final Throwable t)
					{
						LOG.debug("Unable to call after delivery", t);
						afterDeliveryError.set(true);
						throw new RuntimeException(t);
					}
				}
			}
			/*
			 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale
			 * anyway. It also needs to be outside the redelivery guard.
			 */
			try
			{
				executor.waitForQueueRestart();
			}
			catch (final InterruptedException ex)
			{
				connection.onClientInternalException(ex);
			}
		}
	}

	/**
	 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
	 * <P>
	 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue </CODE>
	 * and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter
	 * to create a <CODE>MessageProducer</CODE> object.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a specified
	 *           destination.
	 * @return the MessageProducer
	 * @throws JMSException
	 *            if the session fails to create a MessageProducer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @since 1.1
	 */
	@Override
	public MessageProducer createProducer(final Destination destination) throws JMSException
	{
		checkClosed();
		if (destination instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) destination;
			return customDestination.createProducer(this);
		}
		final int timeSendOut = connection.getSendTimeout();
		return new ActiveMQMessageProducer(this, getNextProducerId(),
				ActiveMQMessageTransformation.transformDestination(destination), timeSendOut);
	}

	/**
	 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and
	 * <CODE> Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
	 * create a <CODE>MessageConsumer</CODE>.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access.
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a consumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @since 1.1
	 */
	@Override
	public MessageConsumer createConsumer(final Destination destination) throws JMSException
	{
		return createConsumer(destination, (String) null);
	}

	/**
	 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since
	 * <CODE> Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
	 * destination parameter to create a <CODE>MessageConsumer</CODE>.
	 * <P>
	 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a MessageConsumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	@Override
	public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
	{
		return createConsumer(destination, messageSelector, false);
	}

	/**
	 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and
	 * <CODE> Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
	 * create a <CODE>MessageConsumer</CODE>.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access.
	 * @param messageListener
	 *           the listener to use for async consumption of messages
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a consumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @since 1.1
	 */
	public MessageConsumer createConsumer(final Destination destination, final MessageListener messageListener) throws JMSException
	{
		return createConsumer(destination, null, messageListener);
	}

	/**
	 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since
	 * <CODE> Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
	 * destination parameter to create a <CODE>MessageConsumer</CODE>.
	 * <P>
	 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @param messageListener
	 *           the listener to use for async consumption of messages
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a MessageConsumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	public MessageConsumer createConsumer(final Destination destination, final String messageSelector,
			final MessageListener messageListener) throws JMSException
	{
		return createConsumer(destination, messageSelector, false, messageListener);
	}

	/**
	 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
	 * specify whether messages published by its own connection should be delivered to it, if the destination is a topic.
	 * <P>
	 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in
	 * the destination parameter to create a <CODE>MessageConsumer</CODE>.
	 * <P>
	 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a destination.
	 * <P>
	 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE> attribute
	 * allows a consumer to inhibit the delivery of messages published by its own connection. The default value for this
	 * attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are topics.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @param noLocal
	 *           - if true, and the destination is a topic, inhibits the delivery of messages published by its own
	 *           connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a MessageConsumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	@Override
	public MessageConsumer createConsumer(final Destination destination, final String messageSelector, final boolean noLocal)
			throws JMSException
	{
		return createConsumer(destination, messageSelector, noLocal, null);
	}

	/**
	 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
	 * specify whether messages published by its own connection should be delivered to it, if the destination is a topic.
	 * <P>
	 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in
	 * the destination parameter to create a <CODE>MessageConsumer</CODE>.
	 * <P>
	 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a destination.
	 * <P>
	 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE> attribute
	 * allows a consumer to inhibit the delivery of messages published by its own connection. The default value for this
	 * attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are topics.
	 *
	 * @param destination
	 *           the <CODE>Destination</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @param noLocal
	 *           - if true, and the destination is a topic, inhibits the delivery of messages published by its own
	 *           connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
	 * @param messageListener
	 *           the listener to use for async consumption of messages
	 * @return the MessageConsumer
	 * @throws JMSException
	 *            if the session fails to create a MessageConsumer due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	public MessageConsumer createConsumer(final Destination destination, final String messageSelector, final boolean noLocal,
			final MessageListener messageListener) throws JMSException
	{
		checkClosed();

		if (destination instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) destination;
			return customDestination.createConsumer(this, messageSelector, noLocal);
		}

		final ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
		int prefetch = 0;
		if (destination instanceof Topic)
		{
			prefetch = prefetchPolicy.getTopicPrefetch();
		}
		else
		{
			prefetch = prefetchPolicy.getQueuePrefetch();
		}
		final ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
		return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, prefetch,
				prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
	}

	/**
	 * Creates a queue identity given a <CODE>Queue</CODE> name.
	 * <P>
	 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
	 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are not
	 * portable.
	 * <P>
	 * Note that this method is not for creating the physical queue. The physical creation of queues is an administrative
	 * task and is not to be initiated by the JMS API. The one exception is the creation of temporary queues, which is
	 * accomplished with the <CODE>createTemporaryQueue</CODE> method.
	 *
	 * @param queueName
	 *           the name of this <CODE>Queue</CODE>
	 * @return a <CODE>Queue</CODE> with the given name
	 * @throws JMSException
	 *            if the session fails to create a queue due to some internal error.
	 * @since 1.1
	 */
	@Override
	public Queue createQueue(final String queueName) throws JMSException
	{
		checkClosed();
		if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX))
		{
			return new ActiveMQTempQueue(queueName);
		}
		return new ActiveMQQueue(queueName);
	}

	/**
	 * Creates a topic identity given a <CODE>Topic</CODE> name.
	 * <P>
	 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
	 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are not
	 * portable.
	 * <P>
	 * Note that this method is not for creating the physical topic. The physical creation of topics is an administrative
	 * task and is not to be initiated by the JMS API. The one exception is the creation of temporary topics, which is
	 * accomplished with the <CODE>createTemporaryTopic</CODE> method.
	 *
	 * @param topicName
	 *           the name of this <CODE>Topic</CODE>
	 * @return a <CODE>Topic</CODE> with the given name
	 * @throws JMSException
	 *            if the session fails to create a topic due to some internal error.
	 * @since 1.1
	 */
	@Override
	public Topic createTopic(final String topicName) throws JMSException
	{
		checkClosed();
		if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX))
		{
			return new ActiveMQTempTopic(topicName);
		}
		return new ActiveMQTopic(topicName);
	}

	/**
	 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
	 *
	 * @param queue
	 *           the <CODE>queue</CODE> to access
	 * @exception InvalidDestinationException
	 *               if an invalid destination is specified
	 * @since 1.1
	 */
	/**
	 * Creates a durable subscriber to the specified topic.
	 * <P>
	 * If a client needs to receive all the messages published on a topic, including the ones published while the
	 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of this
	 * durable subscription and insures that all messages from the topic's publishers are retained until they are
	 * acknowledged by this durable subscriber or they have expired.
	 * <P>
	 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
	 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
	 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
	 * <P>
	 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with the
	 * same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to unsubscribing
	 * (deleting) the old one and creating a new one.
	 * <P>
	 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
	 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
	 * value for this attribute is false.
	 *
	 * @param topic
	 *           the non-temporary <CODE>Topic</CODE> to subscribe to
	 * @param name
	 *           the name used to identify this subscription
	 * @return the TopicSubscriber
	 * @throws JMSException
	 *            if the session fails to create a subscriber due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid topic is specified.
	 * @since 1.1
	 */
	@Override
	public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
	{
		checkClosed();
		return createDurableSubscriber(topic, name, null, false);
	}

	/**
	 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
	 * published by its own connection should be delivered to it.
	 * <P>
	 * If a client needs to receive all the messages published on a topic, including the ones published while the
	 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of this
	 * durable subscription and insures that all messages from the topic's publishers are retained until they are
	 * acknowledged by this durable subscriber or they have expired.
	 * <P>
	 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
	 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only one
	 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An inactive
	 * durable subscriber is one that exists but does not currently have a message consumer associated with it.
	 * <P>
	 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with the
	 * same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to unsubscribing
	 * (deleting) the old one and creating a new one.
	 *
	 * @param topic
	 *           the non-temporary <CODE>Topic</CODE> to subscribe to
	 * @param name
	 *           the name used to identify this subscription
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @param noLocal
	 *           if set, inhibits the delivery of messages published by its own connection
	 * @return the Queue Browser
	 * @throws JMSException
	 *            if the session fails to create a subscriber due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid topic is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	@Override
	public TopicSubscriber createDurableSubscriber(final Topic topic, final String name, final String messageSelector,
			final boolean noLocal) throws JMSException
	{
		checkClosed();

		if (topic == null)
		{
			throw new InvalidDestinationException("Topic cannot be null");
		}

		if (topic instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) topic;
			return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
		}

		connection.checkClientIDWasManuallySpecified();
		final ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
		final int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch()
				? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
		final int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
		return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic),
				name, messageSelector, prefetch, maxPrendingLimit, noLocal, false, asyncDispatch);
	}

	/**
	 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
	 *
	 * @param queue
	 *           the <CODE>queue</CODE> to access
	 * @return the Queue Browser
	 * @throws JMSException
	 *            if the session fails to create a browser due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified
	 * @since 1.1
	 */
	@Override
	public QueueBrowser createBrowser(final Queue queue) throws JMSException
	{
		checkClosed();
		return createBrowser(queue, null);
	}

	/**
	 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
	 * selector.
	 *
	 * @param queue
	 *           the <CODE>queue</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @return the Queue Browser
	 * @throws JMSException
	 *            if the session fails to create a browser due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid destination is specified
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 * @since 1.1
	 */
	@Override
	public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException
	{
		checkClosed();
		return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue),
				messageSelector, asyncDispatch);
	}

	/**
	 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless it
	 * is deleted earlier.
	 *
	 * @return a temporary queue identity
	 * @throws JMSException
	 *            if the session fails to create a temporary queue due to some internal error.
	 * @since 1.1
	 */
	@Override
	public TemporaryQueue createTemporaryQueue() throws JMSException
	{
		checkClosed();
		return (TemporaryQueue) connection.createTempDestination(false);
	}

	/**
	 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless it
	 * is deleted earlier.
	 *
	 * @return a temporary topic identity
	 * @throws JMSException
	 *            if the session fails to create a temporary topic due to some internal error.
	 * @since 1.1
	 */
	@Override
	public TemporaryTopic createTemporaryTopic() throws JMSException
	{
		checkClosed();
		return (TemporaryTopic) connection.createTempDestination(true);
	}

	/**
	 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
	 *
	 * @param queue
	 *           the <CODE>Queue</CODE> to access
	 * @return a new QueueBrowser instance.
	 * @throws JMSException
	 *            if the session fails to create a receiver due to some internal error.
	 * @throws JMSException
	 * @throws InvalidDestinationException
	 *            if an invalid queue is specified.
	 */
	@Override
	public QueueReceiver createReceiver(final Queue queue) throws JMSException
	{
		checkClosed();
		return createReceiver(queue, null);
	}

	/**
	 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message selector.
	 *
	 * @param queue
	 *           the <CODE>Queue</CODE> to access
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @return QueueReceiver
	 * @throws JMSException
	 *            if the session fails to create a receiver due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid queue is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 */
	@Override
	public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
	{
		checkClosed();

		if (queue instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) queue;
			return customDestination.createReceiver(this, messageSelector);
		}

		final ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
		return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue),
				messageSelector, prefetchPolicy.getQueuePrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
	}

	/**
	 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
	 *
	 * @param queue
	 *           the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
	 * @return QueueSender
	 * @throws JMSException
	 *            if the session fails to create a sender due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid queue is specified.
	 */
	@Override
	public QueueSender createSender(final Queue queue) throws JMSException
	{
		checkClosed();
		if (queue instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) queue;
			return customDestination.createSender(this);
		}
		final int timeSendOut = connection.getSendTimeout();
		return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue), timeSendOut);
	}

	/**
	 * Creates a nondurable subscriber to the specified topic.
	 * <p/>
	 * <P>
	 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
	 * <p/>
	 * <P>
	 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published while
	 * they are active.
	 * <p/>
	 * <P>
	 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
	 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
	 * value for this attribute is false.
	 *
	 * @param topic
	 *           the <CODE>Topic</CODE> to subscribe to
	 * @return TopicSubscriber
	 * @throws JMSException
	 *            if the session fails to create a subscriber due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid topic is specified.
	 */
	@Override
	public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
	{
		checkClosed();
		return createSubscriber(topic, null, false);
	}

	/**
	 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
	 * published by its own connection should be delivered to it.
	 * <p/>
	 * <P>
	 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
	 * <p/>
	 * <P>
	 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published while
	 * they are active.
	 * <p/>
	 * <P>
	 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
	 * subscriber's perspective, they do not exist.
	 * <p/>
	 * <P>
	 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
	 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
	 * value for this attribute is false.
	 *
	 * @param topic
	 *           the <CODE>Topic</CODE> to subscribe to
	 * @param messageSelector
	 *           only messages with properties matching the message selector expression are delivered. A value of null or
	 *           an empty string indicates that there is no message selector for the message consumer.
	 * @param noLocal
	 *           if set, inhibits the delivery of messages published by its own connection
	 * @return TopicSubscriber
	 * @throws JMSException
	 *            if the session fails to create a subscriber due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid topic is specified.
	 * @throws InvalidSelectorException
	 *            if the message selector is invalid.
	 */
	@Override
	public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal)
			throws JMSException
	{
		checkClosed();

		if (topic instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) topic;
			return customDestination.createSubscriber(this, messageSelector, noLocal);
		}

		final ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
		return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic),
				null, messageSelector, prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal,
				false, asyncDispatch);
	}

	/**
	 * Creates a publisher for the specified topic.
	 * <p/>
	 * <P>
	 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
	 * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering relationship
	 * with the messages it has previously sent.
	 *
	 * @param topic
	 *           the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
	 * @return TopicPublisher
	 * @throws JMSException
	 *            if the session fails to create a publisher due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid topic is specified.
	 */
	@Override
	public TopicPublisher createPublisher(final Topic topic) throws JMSException
	{
		checkClosed();

		if (topic instanceof CustomDestination)
		{
			final CustomDestination customDestination = (CustomDestination) topic;
			return customDestination.createPublisher(this);
		}
		final int timeSendOut = connection.getSendTimeout();
		return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic), timeSendOut);
	}

	/**
	 * Unsubscribes a durable subscription that has been created by a client.
	 * <P>
	 * This method deletes the state being maintained on behalf of the subscriber by its provider.
	 * <P>
	 * It is erroneous for a client to delete a durable subscription while there is an active
	 * <CODE>MessageConsumer </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is
	 * part of a pending transaction or has not been acknowledged in the session.
	 *
	 * @param name
	 *           the name used to identify this subscription
	 * @throws JMSException
	 *            if the session fails to unsubscribe to the durable subscription due to some internal error.
	 * @throws InvalidDestinationException
	 *            if an invalid subscription name is specified.
	 * @since 1.1
	 */
	@Override
	public void unsubscribe(final String name) throws JMSException
	{
		checkClosed();
		connection.unsubscribe(name);
	}

	@Override
	public void dispatch(final MessageDispatch messageDispatch)
	{
		try
		{
			executor.execute(messageDispatch);
		}
		catch (final InterruptedException e)
		{
			Thread.currentThread().interrupt();
			connection.onClientInternalException(e);
		}
	}

	/**
	 * Acknowledges all consumed messages of the session of this consumed message.
	 * <P>
	 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that its
	 * JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on a
	 * consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
	 * <P>
	 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use implicit
	 * acknowledgement modes.
	 * <P>
	 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
	 * an application-defined group (which is done by calling acknowledge on the last received message of the group,
	 * thereby acknowledging all messages consumed by the session.)
	 * <P>
	 * Messages that have been received but not acknowledged may be redelivered.
	 *
	 * @throws JMSException
	 *            if the JMS provider fails to acknowledge the messages due to some internal error.
	 * @throws javax.jms.IllegalStateException
	 *            if this method is called on a closed session.
	 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
	 */
	public void acknowledge() throws JMSException
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			c.acknowledge();
		}
	}

	/**
	 * Add a message consumer.
	 *
	 * @param consumer
	 *           - message consumer.
	 * @throws JMSException
	 */
	protected void addConsumer(final ActiveMQMessageConsumer consumer) throws JMSException
	{
		this.consumers.add(consumer);
		if (consumer.isDurableSubscriber())
		{
			stats.onCreateDurableSubscriber();
		}
		this.connection.addDispatcher(consumer.getConsumerId(), this);
	}

	/**
	 * Remove the message consumer.
	 *
	 * @param consumer
	 *           - consumer to be removed.
	 * @throws JMSException
	 */
	protected void removeConsumer(final ActiveMQMessageConsumer consumer)
	{
		this.connection.removeDispatcher(consumer.getConsumerId());
		if (consumer.isDurableSubscriber())
		{
			stats.onRemoveDurableSubscriber();
		}
		this.consumers.remove(consumer);
		this.connection.removeDispatcher(consumer);
	}

	/**
	 * Adds a message producer.
	 *
	 * @param producer
	 *           - message producer to be added.
	 * @throws JMSException
	 */
	protected void addProducer(final ActiveMQMessageProducer producer) throws JMSException
	{
		this.producers.add(producer);
		this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
	}

	/**
	 * Removes a message producer.
	 *
	 * @param producer
	 *           - message producer to be removed.
	 * @throws JMSException
	 */
	protected void removeProducer(final ActiveMQMessageProducer producer)
	{
		this.connection.removeProducer(producer.getProducerInfo().getProducerId());
		this.producers.remove(producer);
	}

	/**
	 * Start this Session.
	 *
	 * @throws JMSException
	 */
	protected void start() throws JMSException
	{
		started.set(true);
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			c.start();
		}
		executor.start();
	}

	/**
	 * Stops this session.
	 *
	 * @throws JMSException
	 */
	protected void stop() throws JMSException
	{

		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			c.stop();
		}

		started.set(false);
		executor.stop();
	}

	/**
	 * Returns the session id.
	 *
	 * @return value - session id.
	 */
	protected SessionId getSessionId()
	{
		return info.getSessionId();
	}

	/**
	 * @return a unique ConsumerId instance.
	 */
	protected ConsumerId getNextConsumerId()
	{
		return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
	}

	/**
	 * @return a unique ProducerId instance.
	 */
	protected ProducerId getNextProducerId()
	{
		return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
	}

	/**
	 * Sends the message for dispatch by the broker.
	 *
	 * @param producer
	 *           - message producer.
	 * @param destination
	 *           - message destination.
	 * @param message
	 *           - message to be sent.
	 * @param deliveryMode
	 *           - JMS message delivery mode.
	 * @param priority
	 *           - message priority.
	 * @param timeToLive
	 *           - message expiration.
	 * @param producerWindow
	 * @param onComplete
	 * @throws JMSException
	 */
	protected void send(final ActiveMQMessageProducer producer, final ActiveMQDestination destination, final Message message,
			final int deliveryMode, final int priority, final long timeToLive, final MemoryUsage producerWindow,
			final int sendTimeout, final AsyncCallback onComplete) throws JMSException
	{

		checkClosed();
		if (destination.isTemporary() && connection.isDeleted(destination))
		{
			throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
		}
		synchronized (sendMutex)
		{
			// tell the Broker we are about to start a new transaction
			doStartTransaction();
			final TransactionId txid = transactionContext.getTransactionId();
			final long sequenceNumber = producer.getMessageSequence();

			//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
			message.setJMSDeliveryMode(deliveryMode);
			long expiration = 0L;
			if (!producer.getDisableMessageTimestamp())
			{
				final long timeStamp = System.currentTimeMillis();
				message.setJMSTimestamp(timeStamp);
				if (timeToLive > 0)
				{
					expiration = timeToLive + timeStamp;
				}
			}
			message.setJMSExpiration(expiration);
			message.setJMSPriority(priority);
			message.setJMSRedelivered(false);

			// transform to our own message format here
			ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
			msg.setDestination(destination);
			msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

			// Set the message id.
			if (msg != message)
			{
				message.setJMSMessageID(msg.getMessageId().toString());
				// Make sure the JMS destination is set on the foreign messages too.
				message.setJMSDestination(destination);
			}
			//clear the brokerPath in case we are re-sending this message
			msg.setBrokerPath(null);

			msg.setTransactionId(txid);
			if (connection.isCopyMessageOnSend())
			{
				msg = (ActiveMQMessage) msg.copy();
			}
			msg.setConnection(connection);
			msg.onSend();
			msg.setProducerId(msg.getMessageId().getProducerId());
			if (LOG.isTraceEnabled())
			{
				LOG.trace(getSessionId() + " sending message: " + msg);
			}
			if (onComplete == null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend()
					&& (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null))
			{
				this.connection.asyncSendPacket(msg);
				if (producerWindow != null)
				{
					// Since we defer lots of the marshaling till we hit the
					// wire, this might not
					// provide and accurate size. We may change over to doing
					// more aggressive marshaling,
					// to get more accurate sizes.. this is more important once
					// users start using producer window
					// flow control.
					final int size = msg.getSize();
					producerWindow.increaseUsage(size);
				}
			}
			else
			{
				if (sendTimeout > 0 && onComplete == null)
				{
					this.connection.syncSendPacket(msg, sendTimeout);
				}
				else
				{
					this.connection.syncSendPacket(msg, onComplete);
				}
			}

		}
	}

	/**
	 * Send TransactionInfo to indicate transaction has started
	 *
	 * @throws JMSException
	 *            if some internal error occurs
	 */
	protected void doStartTransaction() throws JMSException
	{
		if (getTransacted() && !transactionContext.isInXATransaction())
		{
			transactionContext.begin();
		}
	}

	/**
	 * Checks whether the session has unconsumed messages.
	 *
	 * @return true - if there are unconsumed messages.
	 */
	public boolean hasUncomsumedMessages()
	{
		return executor.hasUncomsumedMessages();
	}

	/**
	 * Checks whether the session uses transactions.
	 *
	 * @return true - if the session uses transactions.
	 */
	public boolean isTransacted()
	{
		return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
	}

	/**
	 * Checks whether the session used client acknowledgment.
	 *
	 * @return true - if the session uses client acknowledgment.
	 */
	protected boolean isClientAcknowledge()
	{
		return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
	}

	/**
	 * Checks whether the session used auto acknowledgment.
	 *
	 * @return true - if the session uses client acknowledgment.
	 */
	public boolean isAutoAcknowledge()
	{
		return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
	}

	/**
	 * Checks whether the session used dup ok acknowledgment.
	 *
	 * @return true - if the session uses client acknowledgment.
	 */
	public boolean isDupsOkAcknowledge()
	{
		return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
	}

	public boolean isIndividualAcknowledge()
	{
		return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
	}

	/**
	 * Returns the message delivery listener.
	 *
	 * @return deliveryListener - message delivery listener.
	 */
	public DeliveryListener getDeliveryListener()
	{
		return deliveryListener;
	}

	/**
	 * Sets the message delivery listener.
	 *
	 * @param deliveryListener
	 *           - message delivery listener.
	 */
	public void setDeliveryListener(final DeliveryListener deliveryListener)
	{
		this.deliveryListener = deliveryListener;
	}

	/**
	 * Returns the SessionInfo bean.
	 *
	 * @return info - SessionInfo bean.
	 * @throws JMSException
	 */
	protected SessionInfo getSessionInfo() throws JMSException
	{
		final SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
		return info;
	}

	/**
	 * Send the asynchronous command.
	 *
	 * @param command
	 *           - command to be executed.
	 * @throws JMSException
	 */
	public void asyncSendPacket(final Command command) throws JMSException
	{
		connection.asyncSendPacket(command);
	}

	/**
	 * Send the synchronous command.
	 *
	 * @param command
	 *           - command to be executed.
	 * @return Response
	 * @throws JMSException
	 */
	public Response syncSendPacket(final Command command) throws JMSException
	{
		return connection.syncSendPacket(command);
	}

	public long getNextDeliveryId()
	{
		return deliveryIdGenerator.getNextSequenceId();
	}

	public void redispatch(final ActiveMQDispatcher dispatcher, final MessageDispatchChannel unconsumedMessages)
			throws JMSException
	{

		final List<MessageDispatch> c = unconsumedMessages.removeAll();
		for (final MessageDispatch md : c)
		{
			this.connection.rollbackDuplicate(dispatcher, md.getMessage());
		}
		Collections.reverse(c);

		for (final Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();)
		{
			final MessageDispatch md = iter.next();
			executor.executeFirst(md);
		}

	}

	public boolean isRunning()
	{
		return started.get();
	}

	public boolean isAsyncDispatch()
	{
		return asyncDispatch;
	}

	public void setAsyncDispatch(final boolean asyncDispatch)
	{
		this.asyncDispatch = asyncDispatch;
	}

	/**
	 * @return Returns the sessionAsyncDispatch.
	 */
	public boolean isSessionAsyncDispatch()
	{
		return sessionAsyncDispatch;
	}

	/**
	 * @param sessionAsyncDispatch
	 *           The sessionAsyncDispatch to set.
	 */
	public void setSessionAsyncDispatch(final boolean sessionAsyncDispatch)
	{
		this.sessionAsyncDispatch = sessionAsyncDispatch;
	}

	public MessageTransformer getTransformer()
	{
		return transformer;
	}

	public ActiveMQConnection getConnection()
	{
		return connection;
	}

	/**
	 * Sets the transformer used to transform messages before they are sent on to the JMS bus or when they are received
	 * from the bus but before they are delivered to the JMS client
	 */
	public void setTransformer(final MessageTransformer transformer)
	{
		this.transformer = transformer;
	}

	public BlobTransferPolicy getBlobTransferPolicy()
	{
		return blobTransferPolicy;
	}

	/**
	 * Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects) are transferred from producers to
	 * brokers to consumers
	 */
	public void setBlobTransferPolicy(final BlobTransferPolicy blobTransferPolicy)
	{
		this.blobTransferPolicy = blobTransferPolicy;
	}

	public List<MessageDispatch> getUnconsumedMessages()
	{
		return executor.getUnconsumedMessages();
	}

	@Override
	public String toString()
	{
		return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex;
	}

	public void checkMessageListener() throws JMSException
	{
		if (messageListener != null)
		{
			throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
		}
		for (final Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();)
		{
			final ActiveMQMessageConsumer consumer = i.next();
			if (consumer.hasMessageListener())
			{
				throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
			}
		}
	}

	protected void setOptimizeAcknowledge(final boolean value)
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			c.setOptimizeAcknowledge(value);
		}
	}

	protected void setPrefetchSize(final ConsumerId id, final int prefetch)
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			if (c.getConsumerId().equals(id))
			{
				c.setPrefetchSize(prefetch);
				break;
			}
		}
	}

	protected void close(final ConsumerId id)
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			if (c.getConsumerId().equals(id))
			{
				try
				{
					c.close();
				}
				catch (final JMSException e)
				{
					LOG.warn("Exception closing consumer", e);
				}
				LOG.warn("Closed consumer on Command, " + id);
				break;
			}
		}
	}

	public boolean isInUse(final ActiveMQTempDestination destination)
	{
		for (final Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
		{
			final ActiveMQMessageConsumer c = iter.next();
			if (c.isInUse(destination))
			{
				return true;
			}
		}
		return false;
	}

	/**
	 * highest sequence id of the last message delivered by this session. Passed to the broker in the close command,
	 * maintained by dispose()
	 * 
	 * @return lastDeliveredSequenceId
	 */
	public long getLastDeliveredSequenceId()
	{
		return lastDeliveredSequenceId;
	}

	protected void sendAck(final MessageAck ack) throws JMSException
	{
		sendAck(ack, false);
	}

	protected void sendAck(final MessageAck ack, final boolean lazy) throws JMSException
	{
		if (lazy || connection.isSendAcksAsync() || getTransacted())
		{
			asyncSendPacket(ack);
		}
		else
		{
			syncSendPacket(ack);
		}
	}

	protected Scheduler getScheduler() throws JMSException
	{
		return this.connection.getScheduler();
	}

	protected ThreadPoolExecutor getConnectionExecutor()
	{
		return this.connectionExecutor;
	}

	@Override
	public MessageConsumer createDurableConsumer(final Topic arg0, final String arg1) throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public MessageConsumer createDurableConsumer(final Topic arg0, final String arg1, final String arg2, final boolean arg3)
			throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public MessageConsumer createSharedConsumer(final Topic arg0, final String arg1) throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public MessageConsumer createSharedConsumer(final Topic arg0, final String arg1, final String arg2) throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public MessageConsumer createSharedDurableConsumer(final Topic arg0, final String arg1) throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public MessageConsumer createSharedDurableConsumer(final Topic arg0, final String arg1, final String arg2) throws JMSException
	{
		// TODO Auto-generated method stub
		return null;
	}
}
